#include "servicemanager.h"



ServiceManager::ServiceManager()
{
	sp_thread_mutex_init(&mMutex,NULL);

#ifdef _WIN32
	mMapService.rehash(0x10000);
	mMapUserSubIdList.rehash(0x10000);
	mMapUserSubInfo.rehash(0x10000);
	mMapSubDetails.rehash(0x10000);
#elif __linux__
  mMapService.resize(0x10000);
  mMapUserSubIdList.resize(0x10000);
  mMapUserSubInfo.resize(0x10000);
  mMapSubDetails.resize(0x10000);
#endif
}
ServiceManager::~ServiceManager()
{
	sp_thread_mutex_destroy(&mMutex);
}
void ServiceManager::regService(vector<int>& vecService,SP_Sid_t sid)
{
	sp_thread_mutex_lock( &mMutex );
	for(int i=0;i<vecService.size();i++)
	{
		int functionIdx=vecService.at(i);
		UserList* list=mMapService[functionIdx];
		if(list==NULL)
		{
			list=new UserList();
			mMapService[functionIdx]=list;
		}
		list->add(sid);
	}
	sp_thread_mutex_unlock( &mMutex );
}

void ServiceManager::unregService(std::set<int>& setService,SP_Sid_t sid)
{
	sp_thread_mutex_lock( &mMutex );
	set<int>::iterator it=setService.begin();
	while(it!=setService.end())
	{
		int funcIdx=*it;
		UserList* list=mMapService[funcIdx];
		if(list==NULL) {
			it++;
			continue;
		}
		list->remove(sid);
		if(list->getCount()==0)
		{
			delete list;
			mMapService.erase(funcIdx);
		}
		it++;
	}
	sp_thread_mutex_unlock( &mMutex );
}
void sort(MsgExpress::DataItem* subInfo,int count)
{
	if(count<2) return;
	for(int i=1;i<count;i++)
	{
		for(int j=i;j>0;j--)
		{
			if(subInfo[j].key()<subInfo[j-1].key())
			{
				MsgExpress::DataItem temp(subInfo[j-1]);
				subInfo[j-1]=subInfo[j];
				subInfo[j]=temp;
			}
		}
	}
}
int getSortedSub(const MsgExpress::SubscribeData& sub,MsgExpress::DataItem*& subInfo)
{
	subInfo=NULL;
	int count=0;
	for(int i=0;i<sub.condition_size();i++)
	{
		if(sub.condition(i).value_size()>0 && sub.condition(i).key()!=0)
			count++;
	}
	if(count<1)
		return count;
	subInfo=new MsgExpress::DataItem[count];
	count=0;
	for(int i=0;i<sub.condition_size();i++)
	{
		if(sub.condition(i).value_size()>0 && sub.condition(i).key()!=0)
		{
			subInfo[count]=sub.condition(i);
			count++;
		}
	}
	sort(subInfo,count);
	return count;
}
int getSortedPub(MsgExpress::PublishData* pub,MsgExpress::DataItem*& subInfo)
{
	subInfo=NULL;
	int count=0;
	for(int i=0;i<pub->item_size();i++)
	{
		if(pub->item(i).value_size()>0 && pub->item(i).key()!=0)
			count++;
	}
	if(count<1)
		return count;
	subInfo=new MsgExpress::DataItem[count];
	count=0;
	for(int i=0;i<pub->item_size();i++)
	{
		if(pub->item(i).value_size()>0 && pub->item(i).key()!=0)
		{
			subInfo[count]=pub->item(i);
			subInfo[count].add_value("");
			count++;
		}
	}
	sort(subInfo,count);
	return count;
}
void generateSubConditions(int topic,MsgExpress::DataItem* subInfo,int count,bool isPub,vector<string>& vecResult)
{
	vector<string> vecTemp;
	char buf[256];
	memset(buf,0,sizeof(buf));
	sprintf(buf,"0x%x",topic);
	vecResult.clear();
	vecResult.push_back(buf);
	for(int i=0;i<count;i++)
	{
		const MsgExpress::DataItem& item=subInfo[i];
		int key=item.key();
		int length=item.value_size();
		for(int k=0;k<vecResult.size();k++)
		{
			for(int j=0;j<length;j++)
			{
				if(isPub && j==length-1)
					vecTemp.push_back(vecResult[k]);
				else
				{
				    sprintf_s<sizeof(buf)>(buf,"%s&%d=%s",vecResult[k].c_str(),key,item.value(j).c_str());
				    vecTemp.push_back(buf);
				}
			}
		}
		vecResult.clear();
		vecResult=vecTemp;
		vecTemp.clear();
	}
}
void ServiceManager::subscribe(const MsgExpress::SubscribeData& sub,unsigned int sid)
{
	char subKey[32];
	memset(subKey,0,sizeof(subKey));
	sprintf(subKey,"%d:%d",sid,sub.id());
	bool hasSubed=false;
	sp_thread_mutex_lock( &mMutex );
	hasSubed=mMapUserSubInfo[subKey]!=NULL?true:false;
	sp_thread_mutex_unlock( &mMutex );
	if(hasSubed)
		return;
	MsgExpress::DataItem* subInfo=NULL;
	int count=getSortedSub(sub,subInfo);
	vector<string> vecResult;
	generateSubConditions(sub.topic(),subInfo,count,false,vecResult);
	delete []subInfo;
	sp_thread_mutex_lock( &mMutex );
	vector<int>* subIds=mMapUserSubIdList[sid];
	if(subIds==NULL){
		subIds=new vector<int>;
		mMapUserSubIdList[sid]=subIds;
	}
	subIds->push_back(sub.id());
	mMapUserSubInfo[subKey]=new MsgExpress::SubscribeData(sub);;
	for(int i=0;i<vecResult.size();i++)
	{
	    MapSubscription* subscription=mMapSubDetails[vecResult[i]];
		if(subscription==NULL){
			subscription=new MapSubscription();
			 #ifdef _WIN32
			subscription->rehash(0x400);
      #elif __linux__
      subscription->resize(0x400);
      #endif
			mMapSubDetails[vecResult[i]]=subscription;
		}
		vector<int>* userSubIds=(*subscription)[sid];
		if(userSubIds==NULL)
		{
			userSubIds=new vector<int>();
			(*subscription)[sid]=userSubIds;
		}
		userSubIds->push_back(sub.id());
	}
	sp_thread_mutex_unlock( &mMutex );
}

void ServiceManager::unSubscribe(int subId,unsigned int sid)
{
	char subKey[32];
	memset(subKey,0,sizeof(subKey));
	sprintf(subKey,"%d:%d",sid,subId);
	sp_thread_mutex_lock( &mMutex );
	MsgExpress::SubscribeData* sub=mMapUserSubInfo[subKey];
	sp_thread_mutex_unlock( &mMutex );
	if(!sub)
		return;
	MsgExpress::DataItem* subInfo=NULL;
	int count=getSortedSub(*sub,subInfo);
	vector<string> vecResult;
	generateSubConditions(sub->topic(),subInfo,count,false,vecResult);
	delete []subInfo;

	sp_thread_mutex_lock( &mMutex );
	delete mMapUserSubInfo[subKey];
	mMapUserSubInfo[subKey]=NULL;
	vector<int>* subIds=mMapUserSubIdList[sid];
	if(subIds!=NULL){
		vector<int>::iterator it=subIds->begin();
		while(it!=subIds->end())
		{
			if(*it==subId)
			{
				subIds->erase(it);
				break;
			}
			it++;
		}
	}
	for(int i=0;i<vecResult.size();i++)
	{
	    MapSubscription* subscription=mMapSubDetails[vecResult[i]];
		if(subscription!=NULL){
			vector<int>* userSubIds=(*subscription)[sid];
			if(userSubIds!=NULL)
			{
				vector<int>::iterator it=userSubIds->begin();
				while(it!=userSubIds->end())
				{
					if(*it==subId)
					{
						userSubIds->erase(it);
						break;
					}
					it++;
				}
				if(userSubIds->size()==0)
				{
					MapSubscription::iterator iter=subscription->find(sid);
					subscription->erase(iter);
					delete userSubIds;
				}
			}
		}
	}
	sp_thread_mutex_unlock( &mMutex );
}
void ServiceManager::unSubscribeAll(unsigned int sid)
{
	vector<int> vecTemp;
	sp_thread_mutex_lock( &mMutex );
	vector<int>* subIds=mMapUserSubIdList[sid];
	if(subIds!=NULL){
		vecTemp=*subIds;
	}
	sp_thread_mutex_unlock( &mMutex );
	vector<int>::iterator it=vecTemp.begin();
	while(it!=vecTemp.end())
	{
		unSubscribe(*it,sid);
		it++;
	}
}
void ServiceManager::getSubscriberList(MsgExpress::PublishData* pubData,map<int,vector<MsgExpress::SubscribeData*>*>& mapUserSubList){
	int topic=pubData->topic();
	MsgExpress::DataItem* subInfo=NULL;
	int count=getSortedPub(pubData,subInfo);
	vector<string> vecResult;
	generateSubConditions(topic,subInfo,count,true,vecResult);
	delete []subInfo;
	sp_thread_mutex_lock( &mMutex );
	for(int i=0;i<vecResult.size();i++)
	{
	    MapSubscription* subscription=mMapSubDetails[vecResult[i]];
		if(subscription==NULL) continue;
		MapSubscription::iterator it=subscription->begin();
		while(it!=subscription->end())
		{
			int userId=it->first;
			vector<MsgExpress::SubscribeData*>* vecSubData=new vector<MsgExpress::SubscribeData*>();
			mapUserSubList[userId]=vecSubData;
			int size=it->second->size();
			for(int j=0;j<size;j++)
			{
				int subId=it->second->at(j);
				char subKey[32];
	            memset(subKey,0,32);
	            sprintf(subKey,"%d:%d",userId,subId);
				MsgExpress::SubscribeData* subData=mMapUserSubInfo[subKey];
				if(subData)
					vecSubData->push_back(new MsgExpress::SubscribeData(*subData));
			}
			it++;
		}
	}
	sp_thread_mutex_unlock( &mMutex );
}
void ServiceManager::getServiceProviderList(int funcId,SP_SidList* list){
	if(list==NULL)
		return;
	sp_thread_mutex_lock( &mMutex );
	if(mMapService[funcId]!=NULL)
		mMapService[funcId]->copy(list);
	sp_thread_mutex_unlock( &mMutex );
}